package com.example.zookeeper.config.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

/**
 * kafka 消息发送端
 */
@Slf4j
@Component
public class KafkaSender {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 发送消息
     *
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendMessage(String topic, String message){
        ListenableFuture<SendResult<String, String>> sender = kafkaTemplate.send(new ProducerRecord<>(topic, message));
        //发送成功 异步发送回调
        SuccessCallback successCallback = result -> log.info("数据发送成功!");
        //发送失败  异步发送回调
        FailureCallback failureCallback = ex -> log.error("数据发送失败!");
        // 发送消息
        sender.addCallback(successCallback, failureCallback);
    }

    /**
     * 发送消息
     *
     * @param topic 主题
     * @param key 消息key
     * @param message 消息内容
     */
    public void sendMessage(String topic, String key, String message){
        ListenableFuture<SendResult<String, String>> sender = kafkaTemplate.send(new ProducerRecord<>(topic, key, message));
        //发送成功 异步发送回调
        SuccessCallback successCallback = result -> log.info("数据发送成功!");
        //发送失败  异步发送回调
        FailureCallback failureCallback = ex -> log.error("数据发送失败!");
        // 发送消息
        sender.addCallback(successCallback, failureCallback);
    }

}
